package com.guchenbo.flink;

import java.io.Serializable;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

public class TransactionCodec implements Serializable {

    private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

    public String toCsv(Transaction transaction) {
        return String.format("%s,%s,%s", transaction.getAccountId(), transaction.getAmount(),
                        transaction.getTimestamp().format(formatter));
    }

    public Transaction fromCsv(String csv) {
        String[] fields = csv.split(",");
        return Transaction.builder().accountId(Long.parseLong(fields[0])).amount(Integer.parseInt(fields[1]))
                        .timestamp(LocalDateTime.parse(fields[2], formatter)).build();
    }
}
